package com.zan.rocketmqdemo.demo;

import com.zan.rocketmqdemo.constant.MqConstant;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.junit.Test;

import java.util.Date;
import java.util.List;
import java.util.UUID;

/**
 * @Author Zan
 * @Create 2024/8/29 15:45
 * @ClassName: RetryTest
 * @Description : TODO 请用一句话描述该类的功能
 */
public class RetryTest {

    @Test
    public void retryProducer() throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("retry-producer-group");
        producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
        producer.start();
        // 生产者发送消息 重试的次数
        producer.setRetryTimesWhenSendFailed(3);
        producer.setRetryTimesWhenSendAsyncFailed(2);
        String key = UUID.randomUUID().toString();
        System.out.println(key);
        Message message1 = new Message("retryTopic", "vip1", key, "我是vip1的文章".getBytes());
        producer.send(message1);
        System.out.println("发送成功");
        producer.shutdown();
    }

    /**
     * 重试的时间间隔 - 逐级递增的（级别会一直变）
     * "10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
     * 默认重试16次
     * 1. 能否自定义重试次数？可以：setMaxReconsumeTimes(重试次数)
     * 2. 如果重试16次都是失败的？说明这是一个死信消息，则会放入一个死信主题中去（队列名称：%DLQ%retry-consumer-group）
     * 3. 当消息消费失败的时候，该如何正确的处理？
     * @throws Exception
     */
    @Test
    public void retryConsumer() throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry-consumer-group");
        consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
        consumer.subscribe("retryTopic", "*");
        // 设定重试次数 - 一般5-7次
        consumer.setMaxReconsumeTimes(2);
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                MessageExt messageExt = msgs.get(0);
                System.out.println(new Date());
                System.out.println(new String(messageExt.getBody()));
                // 业务报错了 返回null 或者 返回 ConsumeConcurrentlyStatus.RECONSUME_LATER
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        });
        consumer.start();
        System.in.read();
    }

    // 1. 直接监听死信主题的消息，记录下来，然后通知人工介入处理
    @Test
    public void retryDeadConsumer() throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry-dead-consumer-group");
        consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
        consumer.subscribe("%DLQ%retry-consumer-group", "*");
        // 设定重试次数 - 一般5-7次
        consumer.setMaxReconsumeTimes(2);
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                MessageExt messageExt = msgs.get(0);
                System.out.println(new Date());
                System.out.println(new String(messageExt.getBody()));
                System.out.println("记录到特别的位置 文件 mysql 通知人工处理");
                // 业务报错了 返回null 或者 返回 ConsumeConcurrentlyStatus.RECONSUME_LATER
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.in.read();
    }

    // 2. 用法会多一点，自己对于错误，去进行判断处理，不要让他走死信了
    @Test
    public void retryDeadConsumer2() throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry-consumer-group");
        consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
        consumer.subscribe("retryTopic", "*");
        // 设定重试次数 - 一般5-7次
        consumer.setMaxReconsumeTimes(2);
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                MessageExt messageExt = msgs.get(0);
                System.out.println(new Date());
                // 走业务逻辑 - 故意走错误
                try {
                    handleDb();
                } catch (Exception e) {
                    // 重试
                    int reconsumeTimes = messageExt.getReconsumeTimes();
                    if (reconsumeTimes >= 3) {
                        // 自己定义的次数，如果超过了，那就不要重试了
                        System.out.println("记录到特别的位置 文件 mysql 通知人工处理");
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
                // 业务报错了 返回null 或者 返回 ConsumeConcurrentlyStatus.RECONSUME_LATER
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.in.read();
    }

    private void handleDb() {
        System.out.println("处理DB");
        int i = 10 / 0;
    }
}
